1 package org.apache.lucene.index;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Locale;
28 import java.util.Map;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class TieredMergePolicy extends MergePolicy {
76
77
78
79 public static final double DEFAULT_NO_CFS_RATIO = 0.1;
80
81 private int maxMergeAtOnce = 10;
82 private long maxMergedSegmentBytes = 5*1024*1024*1024L;
83 private int maxMergeAtOnceExplicit = 30;
84
85 private long floorSegmentBytes = 2*1024*1024L;
86 private double segsPerTier = 10.0;
87 private double forceMergeDeletesPctAllowed = 10.0;
88 private double reclaimDeletesWeight = 2.0;
89
90
91
92 public TieredMergePolicy() {
93 super(DEFAULT_NO_CFS_RATIO, MergePolicy.DEFAULT_MAX_CFS_SEGMENT_SIZE);
94 }
95
96
97
98
99
100 public TieredMergePolicy setMaxMergeAtOnce(int v) {
101 if (v < 2) {
102 throw new IllegalArgumentException("maxMergeAtOnce must be > 1 (got " + v + ")");
103 }
104 maxMergeAtOnce = v;
105 return this;
106 }
107
108
109
110
111 public int getMaxMergeAtOnce() {
112 return maxMergeAtOnce;
113 }
114
115
116
117
118
119
120 public TieredMergePolicy setMaxMergeAtOnceExplicit(int v) {
121 if (v < 2) {
122 throw new IllegalArgumentException("maxMergeAtOnceExplicit must be > 1 (got " + v + ")");
123 }
124 maxMergeAtOnceExplicit = v;
125 return this;
126 }
127
128
129
130
131 public int getMaxMergeAtOnceExplicit() {
132 return maxMergeAtOnceExplicit;
133 }
134
135
136
137
138
139
140 public TieredMergePolicy setMaxMergedSegmentMB(double v) {
141 if (v < 0.0) {
142 throw new IllegalArgumentException("maxMergedSegmentMB must be >=0 (got " + v + ")");
143 }
144 v *= 1024 * 1024;
145 maxMergedSegmentBytes = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v;
146 return this;
147 }
148
149
150
151
152 public double getMaxMergedSegmentMB() {
153 return maxMergedSegmentBytes/1024/1024.;
154 }
155
156
157
158
159
160
161
162
163 public TieredMergePolicy setReclaimDeletesWeight(double v) {
164 if (v < 0.0) {
165 throw new IllegalArgumentException("reclaimDeletesWeight must be >= 0.0 (got " + v + ")");
166 }
167 reclaimDeletesWeight = v;
168 return this;
169 }
170
171
172 public double getReclaimDeletesWeight() {
173 return reclaimDeletesWeight;
174 }
175
176
177
178
179
180
181 public TieredMergePolicy setFloorSegmentMB(double v) {
182 if (v <= 0.0) {
183 throw new IllegalArgumentException("floorSegmentMB must be >= 0.0 (got " + v + ")");
184 }
185 v *= 1024 * 1024;
186 floorSegmentBytes = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v;
187 return this;
188 }
189
190
191
192
193 public double getFloorSegmentMB() {
194 return floorSegmentBytes/(1024*1024.);
195 }
196
197
198
199
200 public TieredMergePolicy setForceMergeDeletesPctAllowed(double v) {
201 if (v < 0.0 || v > 100.0) {
202 throw new IllegalArgumentException("forceMergeDeletesPctAllowed must be between 0.0 and 100.0 inclusive (got " + v + ")");
203 }
204 forceMergeDeletesPctAllowed = v;
205 return this;
206 }
207
208
209
210
211 public double getForceMergeDeletesPctAllowed() {
212 return forceMergeDeletesPctAllowed;
213 }
214
215
216
217
218
219
220
221
222
223 public TieredMergePolicy setSegmentsPerTier(double v) {
224 if (v < 2.0) {
225 throw new IllegalArgumentException("segmentsPerTier must be >= 2.0 (got " + v + ")");
226 }
227 segsPerTier = v;
228 return this;
229 }
230
231
232
233
234 public double getSegmentsPerTier() {
235 return segsPerTier;
236 }
237
238 private class SegmentByteSizeDescending implements Comparator<SegmentCommitInfo> {
239
240 private final IndexWriter writer;
241
242 SegmentByteSizeDescending(IndexWriter writer) {
243 this.writer = writer;
244 }
245 @Override
246 public int compare(SegmentCommitInfo o1, SegmentCommitInfo o2) {
247 try {
248 final long sz1 = size(o1, writer);
249 final long sz2 = size(o2, writer);
250 if (sz1 > sz2) {
251 return -1;
252 } else if (sz2 > sz1) {
253 return 1;
254 } else {
255 return o1.info.name.compareTo(o2.info.name);
256 }
257 } catch (IOException ioe) {
258 throw new RuntimeException(ioe);
259 }
260 }
261 }
262
263
264
265 protected static abstract class MergeScore {
266
267
268 protected MergeScore() {
269 }
270
271
272
273 abstract double getScore();
274
275
276
277 abstract String getExplanation();
278 }
279
280 @Override
281 public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
282 if (verbose(writer)) {
283 message("findMerges: " + infos.size() + " segments", writer);
284 }
285 if (infos.size() == 0) {
286 return null;
287 }
288 final Collection<SegmentCommitInfo> merging = writer.getMergingSegments();
289 final Collection<SegmentCommitInfo> toBeMerged = new HashSet<>();
290
291 final List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());
292 Collections.sort(infosSorted, new SegmentByteSizeDescending(writer));
293
294
295 long totIndexBytes = 0;
296 long minSegmentBytes = Long.MAX_VALUE;
297 for(SegmentCommitInfo info : infosSorted) {
298 final long segBytes = size(info, writer);
299 if (verbose(writer)) {
300 String extra = merging.contains(info) ? " [merging]" : "";
301 if (segBytes >= maxMergedSegmentBytes/2.0) {
302 extra += " [skip: too large]";
303 } else if (segBytes < floorSegmentBytes) {
304 extra += " [floored]";
305 }
306 message(" seg=" + writer.segString(info) + " size=" + String.format(Locale.ROOT, "%.3f", segBytes/1024/1024.) + " MB" + extra, writer);
307 }
308
309 minSegmentBytes = Math.min(segBytes, minSegmentBytes);
310
311 totIndexBytes += segBytes;
312 }
313
314
315
316 int tooBigCount = 0;
317 while (tooBigCount < infosSorted.size()) {
318 long segBytes = size(infosSorted.get(tooBigCount), writer);
319 if (segBytes < maxMergedSegmentBytes/2.0) {
320 break;
321 }
322 totIndexBytes -= segBytes;
323 tooBigCount++;
324 }
325
326 minSegmentBytes = floorSize(minSegmentBytes);
327
328
329 long levelSize = minSegmentBytes;
330 long bytesLeft = totIndexBytes;
331 double allowedSegCount = 0;
332 while(true) {
333 final double segCountLevel = bytesLeft / (double) levelSize;
334 if (segCountLevel < segsPerTier) {
335 allowedSegCount += Math.ceil(segCountLevel);
336 break;
337 }
338 allowedSegCount += segsPerTier;
339 bytesLeft -= segsPerTier * levelSize;
340 levelSize *= maxMergeAtOnce;
341 }
342 int allowedSegCountInt = (int) allowedSegCount;
343
344 MergeSpecification spec = null;
345
346
347 while(true) {
348
349 long mergingBytes = 0;
350
351
352
353
354 final List<SegmentCommitInfo> eligible = new ArrayList<>();
355 for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
356 final SegmentCommitInfo info = infosSorted.get(idx);
357 if (merging.contains(info)) {
358 mergingBytes += size(info, writer);
359 } else if (!toBeMerged.contains(info)) {
360 eligible.add(info);
361 }
362 }
363
364 final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;
365
366 if (verbose(writer)) {
367 message(" allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
368 }
369
370 if (eligible.size() == 0) {
371 return spec;
372 }
373
374 if (eligible.size() > allowedSegCountInt) {
375
376
377 MergeScore bestScore = null;
378 List<SegmentCommitInfo> best = null;
379 boolean bestTooLarge = false;
380 long bestMergeBytes = 0;
381
382
383 for(int startIdx = 0;startIdx <= eligible.size()-maxMergeAtOnce; startIdx++) {
384
385 long totAfterMergeBytes = 0;
386
387 final List<SegmentCommitInfo> candidate = new ArrayList<>();
388 boolean hitTooLarge = false;
389 for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
390 final SegmentCommitInfo info = eligible.get(idx);
391 final long segBytes = size(info, writer);
392
393 if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
394 hitTooLarge = true;
395
396
397
398
399
400
401 continue;
402 }
403 candidate.add(info);
404 totAfterMergeBytes += segBytes;
405 }
406
407
408
409 assert candidate.size() > 0;
410
411 final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer);
412 if (verbose(writer)) {
413 message(" maybe=" + writer.segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), writer);
414 }
415
416
417
418
419 if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
420 best = candidate;
421 bestScore = score;
422 bestTooLarge = hitTooLarge;
423 bestMergeBytes = totAfterMergeBytes;
424 }
425 }
426
427 if (best != null) {
428 if (spec == null) {
429 spec = new MergeSpecification();
430 }
431 final OneMerge merge = new OneMerge(best);
432 spec.add(merge);
433 for(SegmentCommitInfo info : merge.segments) {
434 toBeMerged.add(info);
435 }
436
437 if (verbose(writer)) {
438 message(" add merge=" + writer.segString(merge.segments) + " size=" + String.format(Locale.ROOT, "%.3f MB", bestMergeBytes/1024./1024.) + " score=" + String.format(Locale.ROOT, "%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""), writer);
439 }
440 } else {
441 return spec;
442 }
443 } else {
444 return spec;
445 }
446 }
447 }
448
449
450 protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, long mergingBytes, IndexWriter writer) throws IOException {
451 long totBeforeMergeBytes = 0;
452 long totAfterMergeBytes = 0;
453 long totAfterMergeBytesFloored = 0;
454 for(SegmentCommitInfo info : candidate) {
455 final long segBytes = size(info, writer);
456 totAfterMergeBytes += segBytes;
457 totAfterMergeBytesFloored += floorSize(segBytes);
458 totBeforeMergeBytes += info.sizeInBytes();
459 }
460
461
462
463
464
465
466
467 final double skew;
468 if (hitTooLarge) {
469
470
471
472
473 skew = 1.0/maxMergeAtOnce;
474 } else {
475 skew = ((double) floorSize(size(candidate.get(0), writer)))/totAfterMergeBytesFloored;
476 }
477
478
479
480 double mergeScore = skew;
481
482
483
484
485
486 mergeScore *= Math.pow(totAfterMergeBytes, 0.05);
487
488
489 final double nonDelRatio = ((double) totAfterMergeBytes)/totBeforeMergeBytes;
490 mergeScore *= Math.pow(nonDelRatio, reclaimDeletesWeight);
491
492 final double finalMergeScore = mergeScore;
493
494 return new MergeScore() {
495
496 @Override
497 public double getScore() {
498 return finalMergeScore;
499 }
500
501 @Override
502 public String getExplanation() {
503 return "skew=" + String.format(Locale.ROOT, "%.3f", skew) + " nonDelRatio=" + String.format(Locale.ROOT, "%.3f", nonDelRatio);
504 }
505 };
506 }
507
508 @Override
509 public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
510 if (verbose(writer)) {
511 message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + writer.segString(infos) + " segmentsToMerge=" + segmentsToMerge, writer);
512 }
513
514 List<SegmentCommitInfo> eligible = new ArrayList<>();
515 boolean forceMergeRunning = false;
516 final Collection<SegmentCommitInfo> merging = writer.getMergingSegments();
517 boolean segmentIsOriginal = false;
518 for(SegmentCommitInfo info : infos) {
519 final Boolean isOriginal = segmentsToMerge.get(info);
520 if (isOriginal != null) {
521 segmentIsOriginal = isOriginal;
522 if (!merging.contains(info)) {
523 eligible.add(info);
524 } else {
525 forceMergeRunning = true;
526 }
527 }
528 }
529
530 if (eligible.size() == 0) {
531 return null;
532 }
533
534 if ((maxSegmentCount > 1 && eligible.size() <= maxSegmentCount) ||
535 (maxSegmentCount == 1 && eligible.size() == 1 && (!segmentIsOriginal || isMerged(infos, eligible.get(0), writer)))) {
536 if (verbose(writer)) {
537 message("already merged", writer);
538 }
539 return null;
540 }
541
542 Collections.sort(eligible, new SegmentByteSizeDescending(writer));
543
544 if (verbose(writer)) {
545 message("eligible=" + eligible, writer);
546 message("forceMergeRunning=" + forceMergeRunning, writer);
547 }
548
549 int end = eligible.size();
550
551 MergeSpecification spec = null;
552
553
554 while(end >= maxMergeAtOnceExplicit + maxSegmentCount - 1) {
555 if (spec == null) {
556 spec = new MergeSpecification();
557 }
558 final OneMerge merge = new OneMerge(eligible.subList(end-maxMergeAtOnceExplicit, end));
559 if (verbose(writer)) {
560 message("add merge=" + writer.segString(merge.segments), writer);
561 }
562 spec.add(merge);
563 end -= maxMergeAtOnceExplicit;
564 }
565
566 if (spec == null && !forceMergeRunning) {
567
568 final int numToMerge = end - maxSegmentCount + 1;
569 final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end));
570 if (verbose(writer)) {
571 message("add final merge=" + merge.segString(), writer);
572 }
573 spec = new MergeSpecification();
574 spec.add(merge);
575 }
576
577 return spec;
578 }
579
580 @Override
581 public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
582 if (verbose(writer)) {
583 message("findForcedDeletesMerges infos=" + writer.segString(infos) + " forceMergeDeletesPctAllowed=" + forceMergeDeletesPctAllowed, writer);
584 }
585 final List<SegmentCommitInfo> eligible = new ArrayList<>();
586 final Collection<SegmentCommitInfo> merging = writer.getMergingSegments();
587 for(SegmentCommitInfo info : infos) {
588 double pctDeletes = 100.*((double) writer.numDeletedDocs(info))/info.info.maxDoc();
589 if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
590 eligible.add(info);
591 }
592 }
593
594 if (eligible.size() == 0) {
595 return null;
596 }
597
598 Collections.sort(eligible, new SegmentByteSizeDescending(writer));
599
600 if (verbose(writer)) {
601 message("eligible=" + eligible, writer);
602 }
603
604 int start = 0;
605 MergeSpecification spec = null;
606
607 while(start < eligible.size()) {
608
609
610
611 final int end = Math.min(start + maxMergeAtOnceExplicit, eligible.size());
612 if (spec == null) {
613 spec = new MergeSpecification();
614 }
615
616 final OneMerge merge = new OneMerge(eligible.subList(start, end));
617 if (verbose(writer)) {
618 message("add merge=" + writer.segString(merge.segments), writer);
619 }
620 spec.add(merge);
621 start = end;
622 }
623
624 return spec;
625 }
626
627 private long floorSize(long bytes) {
628 return Math.max(floorSegmentBytes, bytes);
629 }
630
631 private boolean verbose(IndexWriter writer) {
632 return writer != null && writer.infoStream.isEnabled("TMP");
633 }
634
635 private void message(String message, IndexWriter writer) {
636 writer.infoStream.message("TMP", message);
637 }
638
639 @Override
640 public String toString() {
641 StringBuilder sb = new StringBuilder("[" + getClass().getSimpleName() + ": ");
642 sb.append("maxMergeAtOnce=").append(maxMergeAtOnce).append(", ");
643 sb.append("maxMergeAtOnceExplicit=").append(maxMergeAtOnceExplicit).append(", ");
644 sb.append("maxMergedSegmentMB=").append(maxMergedSegmentBytes/1024/1024.).append(", ");
645 sb.append("floorSegmentMB=").append(floorSegmentBytes/1024/1024.).append(", ");
646 sb.append("forceMergeDeletesPctAllowed=").append(forceMergeDeletesPctAllowed).append(", ");
647 sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
648 sb.append("maxCFSSegmentSizeMB=").append(getMaxCFSSegmentSizeMB()).append(", ");
649 sb.append("noCFSRatio=").append(noCFSRatio);
650 return sb.toString();
651 }
652 }